Node.js Streams

Node.js

धाराएँ क्या हैं?

Node.js , , .

इन्हें कन्वेयर बेल्ट के रूप में सोचें जो डेटा को एक स्थान से दूसरे स्थान तक ले जाते हैं, जिससे आपको पूरे डेटा सेट की प्रतीक्षा करने के बजाय प्रत्येक टुकड़े पर काम करने की अनुमति मिलती है।

स्ट्रीम Node.js की सबसे शक्तिशाली विशेषताओं में से एक हैं और इनका व्यापक रूप से उपयोग किया जाता है:

फ़ाइल स्वरूप फ़ंक्शंस

फ़ाइलें ले जाना/लिखना

HTTP अनुरोध और प्रतिक्रियाएँ

वेब अनुरोधों और प्रतिक्रियाओं को संभालना

डेटा संपीड़न और विस्तार

डेटा को संक्षिप्त और विस्तृत करें

डेटाबेस संचालन

डेटाबेस संचालन को संभालना

वास्तविक समय डेटा प्रोसेसिंग

वास्तविक समय डेटा विनिमय और प्रसंस्करण

स्ट्रीम के साथ शुरुआत करना

Node.js में डेटा के कुशल हेरफेर के लिए स्ट्रीम मूलभूत अवधारणाओं में से एक है।

वे आपको डेटा को उपलब्ध होने पर टुकड़ों में संसाधित करने की अनुमति देते हैं, बजाय एक ही बार में सब कुछ मेमोरी में लोड करने के।

मूल स्ट्रीम उदाहरण

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
writableStream.on('finish', () => {
  console.log('File copy completed!');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

स्ट्रीम का उपयोग क्यों करें?

स्ट्रीम का उपयोग करने के कई लाभ हैं:

मेमोरी क्षमता:बड़ी फ़ाइलों को पूरी तरह से मेमोरी में लोड किए बिना संसाधित करें
समय कौशल:सभी डेटा की प्रतीक्षा करने के बजाय, डेटा उपलब्ध होते ही प्रोसेसिंग शुरू कर दें
सहयोगात्मकता:स्ट्रीम को संयोजित करके शक्तिशाली डेटा पाइपलाइन बनाएं
बेहतर उपयोगकर्ता अनुभव:डेटा उपलब्ध होते ही उपयोगकर्ताओं को परोसें (उदाहरण के लिए, वीडियो स्ट्रीमिंग)

💡धाराओं के लाभ:

512MB RAM वाले सर्वर पर 1GB फ़ाइल ले जाने की कल्पना करें:

  • धाराओं के बिना:संपूर्ण फ़ाइल को मेमोरी में लोड करने का प्रयास करते समय प्रक्रिया क्रैश हो जाती है
  • धाराओं के साथ:आप फ़ाइल को छोटे-छोटे टुकड़ों में संसाधित करते हैं (उदाहरण के लिए, एक समय में 64KB)।

महत्वपूर्ण स्ट्रीम प्रकार

Node.js , :

स्ट्रीम प्रकार व्याख्या सामान्य उदाहरण
Readable स्ट्रीम जो डेटा पास कर सकती हैं (डेटा द्वारा) fs.createReadStream(), HTTP , process.stdin
Writable स्ट्रीम जिसमें डेटा लिखा जा सकता है (डेटा गंतव्य) fs.createWriteStream(), HTTP , process.stdout
Duplex स्ट्रीम पढ़ने योग्य और लिखने योग्य दोनों हैं टीसीपी सॉकेट, ज़्लिब स्ट्रीम
Transform डुप्लेक्स स्ट्रीम जो डेटा को लिखते और प्रसारित करते समय बदल सकती है ज़्लिब धाराएँ, क्रिप्टो धाराएँ

⚠️नोट:

Node.js EventEmitter , .

पठनीय धाराएँ

पठनीय धाराएँ आपको किसी स्रोत से डेटा पढ़ने की अनुमति देती हैं। उदाहरण:

एक पठनीय स्ट्रीम बनाना

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Events for readable streams
readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  console.log(chunk);
});

readableStream.on('end', () => {
  console.log('No more data to read.');
});

readableStream.on('error', (err) => {
  console.error('Error reading from stream:', err);
});

कदम बढ़ाने के तरीके

पठनीय धाराएँ दो तरीकों में से एक में काम करती हैं:

const fs = require('fs');

// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Manually consume the stream using read()
readableStream.on('readable', () => {
  let chunk;
  while (null !== (chunk = readableStream.read())) {
    console.log(`Read ${chunk.length} bytes of data.`);
    console.log(chunk);
  }
});

readableStream.on('end', () => {
  console.log('No more data to read.');
});

लिखने योग्य धाराएँ

लिखने योग्य स्ट्रीम आपको किसी गंतव्य पर डेटा लिखने की अनुमति देती हैं। उदाहरण:

एक लिखने योग्य स्ट्रीम बनाना

const fs = require('fs');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');

// End the stream
writableStream.end();

// Events for writable streams
writableStream.on('finish', () => {
  console.log('All data has been written to the file.');
});

writableStream.on('error', (err) => {
  console.error('Error writing to stream:', err);
});

अभिघातज के बाद के तनाव का प्रबंधन

स्ट्रीम पर लिखते समय, विलंबता तब होती है जब डेटा संसाधित होने की तुलना में तेज़ी से लिखा जाता है।

राइट() विधि एक बूलियन मान लौटाती है जो यह दर्शाती है कि लिखना जारी रखना सुरक्षित है या नहीं।

const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

function writeData() {
  let i = 100;
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time, close the stream
        writableStream.write('Last chunk!\n');
        writableStream.end();
      } else {
        // Continue writing data
        const data = `Data chunk ${i}\n`;
        // Write and check if we should continue
        ok = writableStream.write(data);
      }
    }
    while (i > 0 && ok);

    if (i > 0) {
      // We need to wait for the drain event before writing more
      writableStream.once('drain', write);
    }
  }
  write();
}

writeData();
writableStream.on('finish', () => {
  console.log('All data written successfully.');
});

Pipe

पाइप() विधि एक पठनीय स्ट्रीम को एक लिखने योग्य स्ट्रीम से जोड़ती है, स्वचालित रूप से डेटा के प्रवाह को प्रबंधित करती है, और बाद के संपीड़न को संभालती है।

यह धाराओं का उपभोग करने का एक आसान तरीका है।

const fs = require('fs');

// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');

// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
readableStream.on('error', (err) => {
  console.error('Read error:', err);
});

writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

writableStream.on('finish', () => {
  console.log('File copy completed!');
});

चेन पाइप

आप पाइप() का उपयोग करके कई स्ट्रीम को एक साथ जोड़ सकते हैं।

ट्रांसफ़ॉर्म स्ट्रीम के साथ काम करते समय यह विशेष रूप से उपयोगी है।

const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
  .pipe(zlib.createGzip()) // Compress the data
  .pipe(fs.createWriteStream('destination.txt.gz'))
  .on('finish', () => {
    console.log('File compressed successfully!');
  });

💡नोट:

पाइप() विधि गंतव्य स्ट्रीम लौटाती है, जो श्रृंखला चलाती है।

डुप्लेक्स और ट्रांसफ़ॉर्म धाराएँ

द्वैध धाराएँ

डुप्लेक्स स्ट्रीम दो-तरफा पाइप की तरह पढ़ने योग्य और लिखने योग्य दोनों हैं।

टीसीपी सॉकेट डुप्लेक्स स्ट्रीम का एक अच्छा उदाहरण है।

const net = require('net');

// Create a TCP server
const server = net.createServer((socket) => {
  // 'socket' is a duplex stream

  // Handle incoming data (readable side)
  socket.on('data', (data) => {
    console.log('Received:', data.toString());

    // Echo back (writable side)
    socket.write(`Echo: ${data}`);
  });

  socket.on('end', () => {
    console.log('Client disconnected');
  });
});

server.listen(8080, () => {
  console.log('Server listening on port 8080');
});

// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
  console.log('Connected to server');
  client.write('Hello from client!');
});

client.on('data', (data) => {
  console.log('Server says:', data.toString());
  client.end(); // Close the connection
});
*/

धाराओं को रूपांतरित करें

ट्रांसफ़ॉर्म स्ट्रीम डुप्लेक्स स्ट्रीम हैं जो डेटा को अपने पथ से गुजरते समय रूपांतरित कर सकती हैं।

वे पाइपों पर डेटा संसाधित करने के लिए आदर्श हैं।

const { Transform } = require('stream');
const fs = require('fs');

// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Transform the chunk to uppercase
    const upperChunk = chunk.toString().toUpperCase();
    // Push the transformed data
    this.push(upperChunk);
    // Signal that we're done with this chunk
    callback();
  }
}

// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');

// Pipe the data through our transform stream
readableStream
  .pipe(uppercaseTransform)
  .pipe(writableStream)
  .on('finish', () => {
    console.log('Transformation completed!');
  });

स्ट्रीम इवेंट

सभी स्ट्रीम इवेंटएमिटर के उदाहरण हैं और कई इवेंट उत्सर्जित करते हैं:

पठनीय स्ट्रीम घटनाएँ

लिखने योग्य स्ट्रीम इवेंट

stream.pipeline() तरीका

पाइपलाइन() फ़ंक्शन (Node.js v10.0.0 के बाद से उपलब्ध) स्ट्रीम को एक साथ श्रृंखलाबद्ध करने का एक अधिक मजबूत तरीका है, विशेष रूप से त्रुटि प्रबंधन के लिए।

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline that handles errors properly
pipeline(
  fs.createReadStream('source.txt'),
  zlib.createGzip(),
  fs.createWriteStream('destination.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded!');
    }
  }
);

💡नोट:

यदि उनमें से कोई भी त्रुटि हो तो पाइपलाइन() सभी स्ट्रीम को ठीक से साफ़ कर देगी, जिससे संभावित मेमोरी लीक को रोका जा सकेगा।

ऑब्जेक्ट मोड स्ट्रीम

डिफ़ॉल्ट रूप से, स्ट्रीम स्ट्रिंग्स और बफ़र ऑब्जेक्ट के साथ काम करती हैं।

हालाँकि, जावास्क्रिप्ट ऑब्जेक्ट के साथ काम करने के लिए स्ट्रीम को 'ऑब्जेक्ट मोड' पर सेट किया जा सकता है।

const { Readable, Writable, Transform } = require('stream');

// Create a readable stream in object mode
const objectReadable = new Readable({
  objectMode: true,
  read() {} // Implementation required but can be no-op
});

// Create a transform stream in object mode
const objectTransform = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Add a property to the object
    chunk.transformed = true;
    chunk.timestamp = new Date();
    this.push(chunk);
    callback();
  }
});

// Create a writable stream in object mode
const objectWritable = new Writable({
  objectMode: true,
  write(chunk, encoding, callback) {
    console.log('Received object:', chunk);
    callback();
  }
});

// Connect the streams
objectReadable
  .pipe(objectTransform)
  .pipe(objectWritable);

// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data

उन्नत स्ट्रीम विधियाँ

1. पाइपलाइन के साथ त्रुटि प्रबंधन()

स्ट्रीम श्रृंखलाओं में त्रुटियों को संभालने के लिए पाइपलाइन() विधि अनुशंसित तरीका है:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
   if (err) {
    console.error('Pipeline failed:', err);
   } else {
    console.log('Pipeline succeeded');
   }
  }
);

2. ऑब्जेक्ट मोड स्ट्रीम

स्ट्रीम जावास्क्रिप्ट ऑब्जेक्ट के साथ काम कर सकती हैं, न कि केवल स्ट्रिंग और बफ़र्स के साथ:

const { Readable } = require('stream');

// Create a readable stream in object mode
const objectStream = new Readable({
  objectMode: true,
  read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
  console.log('Received:', obj);
});

व्यावहारिक उदाहरण

HTTP स्ट्रीमिंग

HTTP अनुरोधों और प्रतिक्रियाओं में स्ट्रीम का व्यापक रूप से उपयोग किया जाता है।

const http = require('http');
const fs = require('fs');

// Create an HTTP server
const server = http.createServer((req, res) => {
  // Handle different routes
  if (req.url === '/') {
    // Send a simple response
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end('

Stream Demo

Try streaming a file or streaming a video.

'); } else if (req.url === '/file') { // Stream a large text file res.writeHead(200, { 'Content-Type': 'text/plain' }); const fileStream = fs.createReadStream('largefile.txt', 'utf8'); // Pipe the file to the response (handles backpressure automatically) fileStream.pipe(res); // Handle errors fileStream.on('error', (err) => { console.error('File stream error:', err); res.statusCode = 500; res.end('Server Error'); }); } else if (req.url === '/video') { // Stream a video file with proper headers const videoPath = 'video.mp4'; const stat = fs.statSync(videoPath); const fileSize = stat.size; const range = req.headers.range; if (range) { // Handle range requests for video seeking const parts = range.replace(/bytes=/, "").split("-"); const start = parseInt(parts[0], 10); const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; const chunksize = (end - start) + 1; const videoStream = fs.createReadStream(videoPath, { start, end }); res.writeHead(206, { 'Content-Range': `bytes ${start}-${end}/${fileSize}`, 'Accept-Ranges': 'bytes', 'Content-Length': chunksize, 'Content-Type': 'video/mp4' }); videoStream.pipe(res); } else { // No range header, send entire video res.writeHead(200, { 'Content-Length': fileSize, 'Content-Type': 'video/mp4' }); fs.createReadStream(videoPath).pipe(res); } } else { // 404 Not Found res.writeHead(404, { 'Content-Type': 'text/plain' }); res.end('Not Found'); } }); // Start the server server.listen(8080, () => { console.log('Server running at http://localhost:8080/'); });

बड़ी CSV फ़ाइलें संसाधित करना

const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser

// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // Only pass through rows that meet our criteria
    if (parseInt(row.age) > 18) {
      // Modify the row
      row.isAdult = 'Yes';
      // Push the transformed row
      this.push(row);
    }
    callback();
  }
});

// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    results.push(row);
    callback();
  }
});

// Create the processing pipeline
fs.createReadStream('people.csv')
  .pipe(csv())
  .pipe(filterTransform)
  .pipe(writeToArray)
  .on('finish', () => {
    console.log(`Processed ${results.length} records:`);
    console.log(results);
  })
  .on('error', (err) => {
    console.error('Error processing CSV:', err);
  });

सर्वोत्तम अभ्यास

त्रुटि प्रबंधन:एप्लिकेशन को क्रैश होने से बचाने के लिए हमेशा स्ट्रीम में त्रुटि घटनाओं को संभालें
पाइपलाइन() का उपयोग करें:बेहतर त्रुटि प्रबंधन और सफ़ाई के लिए स्ट्रीम.पाइपलाइन() से .पाइप() को प्राथमिकता दें
फिर तनाव को संभालें:स्मृति समस्याओं से बचने के लिए राइट() के रिटर्न मान का सम्मान करें
धाराएँ समाप्त करें:जब आपका काम पूरा हो जाए तो लिखने योग्य स्ट्रीम पर एंड() को कॉल करें
समवर्ती परिचालन से बचें:स्ट्रीम हैंडलर के अंदर एसिंक्रोनस ऑपरेशंस के साथ इवेंट लूप को ब्लॉक न करें
बफ़र का आकार:हाईवॉटरमार्क (बफर आकार) सेटिंग्स के बारे में सावधान रहें

⚠️चेतावनी:

स्ट्रीम को गलत तरीके से संभालने से मेमोरी लीक और प्रदर्शन संबंधी समस्याएं हो सकती हैं।

हमेशा त्रुटियों को संभालें और स्ट्रीम को ठीक से समाप्त करें।

सारांश

Node.js में स्ट्रीम एक मौलिक अवधारणा है जो खुले डेटा हेरफेर की अनुमति देती है। वे हैं:

अभ्यास

पठनीय स्ट्रीम को लिखने योग्य स्ट्रीम से जोड़ने के लिए किस विधि का उपयोग किया जाता है?

connect()
✗ ग़लत! Node.js स्ट्रीम पर "कनेक्ट()" एक मान्य विधि नहीं है
pipe()
✓ ठीक है! "पाइप()" विधि एक पठनीय स्ट्रीम को एक लिखने योग्य स्ट्रीम से जोड़ने का उचित तरीका है
link()
✗ ग़लत! Node.js स्ट्रीम पर "लिंक()" एक मान्य विधि नहीं है
flow()
✗ ग़लत! Node.js स्ट्रीम पर "प्रवाह ()" एक मान्य विधि नहीं है